#!/usr/bin/env python
# Copyright 2014-2015 RethinkDB, all rights reserved.

import os, sys, time

sys.path.append(os.path.join(os.path.dirname(__file__), os.path.pardir, 'common'))
import utils, rdb_unittest

# --

class Changefeeds_Basic(rdb_unittest.RdbTestCase):
    '''Basic tests'''
    
    shards = 1
    replicas = 3
    recordsToGenerate = 10
    
    def test_simple(self):
        '''Make simple changes and ensure a single changefeed sees them'''
        
        server = self.cluster[0]
        conn = self.r.connect(host=server.host, port=server.driver_port)
        
        expectedCount = self.samplesPerShard * len(utils.getShardRanges(self.conn, self.tableName))
        with utils.NextWithTimeout(self.table.changes().limit(expectedCount).run(conn)) as changefeed:
            expectedChangedIds = self.makeChanges()
            self.assertEqual(expectedChangedIds, sorted([x['new_val']['id'] for x in changefeed]))
    
    def test_multiple_servers(self):
        '''The same changefeed on multiple servers should get the same results'''
        
        connections = [self.r.connect(host=x.host, port=x.driver_port) for x in self.cluster]
        expectedCount = self.samplesPerShard * len(utils.getShardRanges(connections[0], self.tableName))
        changefeeds = [utils.NextWithTimeout(self.table.changes().limit(expectedCount).run(x)) for x in connections]
        
        # add data across all of the connections
        
        expectedResults = self.makeChanges()
        
        # verify that all of the feeds got the expected results
        
        for feed in changefeeds:
            feedResults = sorted([x['new_val']['id'] for x in feed])
            self.assertEqual(feedResults, expectedResults)
    
    def test_same_change(self):
        '''The same change made repeatedly should only appear once in a changefeed'''
        
        feed = self.table.changes().run(self.conn)
        updateItemId = self.table.nth(0)['id'].run(self.conn)
        
        # update the item
        
        self.table.get(updateItemId).update({'update':1}).run(self.conn)
        feed.next(wait=.2)
        
        # make the same update multiple times showing there are no further entries
        
        for _ in range(30):
            self.table.get(updateItemId).update({'update':1}).run(self.conn)
            self.assertRaises(self.r.ReqlTimeoutError, feed.next, wait=.2)
        
        # other changes should work
        
        self.table.get(updateItemId).update({'update':2}).run(self.conn)
        feed.next(wait=.2)
    
    def test_primary_falure(self):
        '''Test that a changefeed is closed at the death of the primary replica for a shard'''
        self.destructiveTest = True
        
        primary = self.getPrimaryForShard(0)
        replica = self.getReplicaForShard(0)
        
        conn = self.r.connect(host=replica.host, port=replica.driver_port)
        shardInfo = self.table.config()['shards'][0].run(conn)
        
        self.assertTrue(primary is not None)
        self.assertTrue(primary.name in shardInfo['primary_replica'], msg='Primary is not a primary')
        
        self.assertTrue(replica is not None)
        self.assertTrue(replica.name not in shardInfo['primary_replica'], msg='Replica is a primary')
        self.assertTrue(replica.name in shardInfo['replicas'], msg='Replica is not in replicas')
        
        # get the id of something in that range
        
        shardRange = utils.getShardRanges(conn, table=self.tableName, db=self.dbName)[0]
        updateItemId = self.table.between(shardRange[0], shardRange[1]).nth(0)['id'].run(conn)
        
        # start the changefeed
        
        changefeed = self.table.changes().run(conn)
        
        # make a change and retrieve it
                    
        self.table.get(updateItemId).update({'update':1}).run(conn)
        changefeed.next(wait=.2)
        
        # kill the primary
        
        primary.kill()
        self.table.wait(wait_for='ready_for_writes', timeout=20).run(conn)
        
        # the changefeed should have closed
        
        self.table.get(updateItemId).update({'update':1}).run(conn)
        self.assertRaises(self.r.ReqlRuntimeError, changefeed.next, wait=.2)

    def test_secondary_failure(self):
        '''Test when a secondary shardholder fails for a range'''
        self.destructiveTest = True
        
        primary = self.getPrimaryForShard(0)
        replica = self.getReplicaForShard(0)
        
        conn = self.r.connect(host=primary.host, port=primary.driver_port)
        shardInfo = self.table.config()['shards'][0].run(conn)
        
        self.assertTrue(primary is not None)
        self.assertTrue(primary.name in shardInfo['primary_replica'], msg='Primary is not a primary')
        
        self.assertTrue(replica is not None)
        self.assertTrue(replica.name not in shardInfo['primary_replica'], msg='Replica is a primary')
        self.assertTrue(replica.name in shardInfo['replicas'], msg='Replica is not in replicas')
        
        # start the changefeed
        
        changefeed = self.table.changes().run(conn)
        
        # locate an item inside the range
        
        targetRange = utils.getShardRanges(conn, table=self.tableName, db=self.dbName)[0]
        updateItemId = self.table.between(targetRange[0], targetRange[1]).nth(0).run(conn)['id']
        
        # add a change and retrieve it
        
        self.table.get(updateItemId).update({'update':1}).run(conn)
        changefeed.next(wait=.2)
        
        # kill a secondary server
        
        replica.kill()
        self.table.wait(wait_for='ready_for_writes', timeout=20).run(conn)
        
        # add another change and retrieve it
        
        self.table.get(updateItemId).update({'update':2}).run(conn)
        changefeed.next(wait=.2)
    
    def test_connection_death(self):
        '''Test that the client handles the death of the server at the other end of the connection correctly'''
        self.destructiveTest = True
        
        stable = self.getPrimaryForShard(0)
        sacrifice = self.getReplicaForShard(0)
        
        stable_conn = self.r.connect(host=stable.host, port=stable.driver_port)
        sacrifice_conn = self.r.connect(host=sacrifice.host, port=sacrifice.driver_port)
        
        # start the changefeed
        
        changefeed = self.table.changes().run(sacrifice_conn)
        
        # add a change and retrieve it
        
        self.table.insert({}).run(stable_conn)
        next(changefeed)
        
        # kill a primary server
        
        sacrifice.kill()
        
        # check that we error
        
        self.table.insert({}).run(stable_conn)
        self.assertRaises(self.r.ReqlDriverError, next, changefeed)

# - generate the table drop tests

for feed in [
    { 'name':'table',   'feed':lambda self: self.table.changes().run(self.conn) },
    { 'name':'point',   'feed':lambda self: self.table.get(self.table.nth(1)['id']).changes().run(self.conn) },
    { 'name':'between', 'feed':lambda self: self.table.between(self.r.minval, self.r.maxval).changes().run(self.conn) }
]:
    def test_table_drop_test_generator(feed):
        def test(self):
            '''Test that a %s changefeed is closed when the table it references is dropped''' % feed['name']
            
            # start the feed
            
            runningFeed = feed['feed'](self)
            
            # empty the feed of any inital values
            
            try:
                while True:
                    runningFeed.next(wait=.1)
            except self.r.ReqlTimeoutError: pass
            
            # drop the table
            
            self.r.db(self.dbName).table_drop(self.tableName).run(self.conn)
            
            # check the feed errors
            
            result = None
            try:
                result = runningFeed.next(wait=.2)
            except self.r.ReqlRuntimeError:
                pass # expected result
            except self.r.ReqlTimeoutError:
                raise AssertionError('%s feed timed out rather than getting a ReqlRuntimeError error' % feed['name'])
            except Exception as e:
                raise AssertionError('expected ReqlRuntimeError, but raised: %r' % e)
            else:
                raise AssertionError('expected ReqlRuntimeError, but got value: %r' % result)
        return test
    testName = 'test_table_drop_%s_feed' % feed['name']
    setattr(Changefeeds_Basic, testName, test_table_drop_test_generator(feed))

# ==== main

if __name__ == '__main__':
    rdb_unittest.main()
